package com.jianying.day04;

import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchema;
import org.apache.flink.connector.kafka.sink.KafkaSink;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

/**
 * ClassName: Flink09_SInk_Kafka
 * Package: com.atguigu.day04
 * Description:
 *
 * @Author: tubage
 * @Create: 2024/4/2 15:47
 * @Version: 1.0
 */
public class Flink09_SInk_Kafka {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        DataStreamSource<String> socketDS = env.socketTextStream("localhost", 8888);

        KafkaSink<String> kafSink = KafkaSink.<String>builder()
                .setBootstrapServers("localhost:9092,localhost:9093,localhost:9094")
                .setRecordSerializer(
                        KafkaRecordSerializationSchema.builder()
                                .setTopic("first")
                                .setValueSerializationSchema(new SimpleStringSchema())
                                .build()
                )
//                .setDeliveryGuarantee(DeliveryGuarantee.EXACTLY_ONCE)
//                .setTransactionalIdPrefix()
                .build();

        socketDS.print();
        System.out.println(kafSink);

        socketDS.sinkTo(kafSink);
        env.execute();
    }
}
